fix: Prevent deadlock in subprocess executor by allowing concurrent I/O#901
Conversation
The previous implementation called input_thread.join() before reading from stdout, which could cause a deadlock when: 1. The input thread writes to stdin 2. The subprocess processes and writes to stdout 3. If stdout buffer fills up, subprocess blocks 4. But we're waiting for input thread to finish before reading stdout 5. Input thread can't finish because subprocess is blocked on stdout This fix allows input and output to happen concurrently by moving the input_thread.join() to the finally block after reading is complete. Closes #871 Co-Authored-By: AJ Steers <aj@airbyte.io>
Original prompt from AJ Steers |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1765249790-fix-executor-deadlock' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1765249790-fix-executor-deadlock'Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
Community SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
📝 Walkthrough📝 WalkthroughPre-merge checks and finishing touches✅ Passed checks (5 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
airbyte/_executors/base.py (1)
72-78: Nice docstring explaining the deadlock scenarioThe docstring clearly captures the stdout-buffer deadlock and the rationale for concurrent stdin/stdout handling; this should make future maintenance much easier, and the behavior matches the new control flow as far as I can see. 👍 Wdyt about keeping this as the single source of truth and linking to issue #871 here for extra context, or is that overkill?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/_executors/base.py(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (1)
airbyte/_executors/base.py (1)
100-102: Comment accurately documents the key concurrency changeThe inline comment about not joining the input thread immediately matches the new behavior and the deadlock analysis from the PR description; letting the stdin thread run while stdout is consumed should avoid the prior pipe back‑pressure stall. I don’t see any mismatch between the comment and the code here, so this looks good to me—anything specific you’d want to mention about the subsequent timeout join behavior, or is that sufficiently covered by the code below already, wdyt?
|
Devin, please review bot feedback. |
…ror wrapper Addresses CodeRabbit feedback: check exit code first and raise AirbyteSubprocessFailedError (with input thread exception as original_exception) before raising raw input thread exceptions. This ensures callers that catch AirbyteSubprocessFailedError receive the proper wrapped exception. Co-Authored-By: AJ Steers <aj@airbyte.io>
|
Reviewed and addressed the CodeRabbit feedback. The bot correctly identified that the exception handling order was wrong - I was raising the raw input thread exception before checking the exit code, which would bypass the Fix (commit d26a8a1): Reordered the checks so that:
This ensures callers like All 218 unit tests pass locally. |
Per CodeRabbit feedback: if input_thread.join(timeout=10) expires and the thread is still alive, a non-daemon thread would block Python interpreter shutdown. Setting daemon=True ensures the thread won't prevent process shutdown while still allowing graceful cleanup via the join timeout. Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte/_executors/base.py (1)
140-142: Thread join placement looks correct!Joining the input thread after process termination (with a 10-second timeout) makes sense. Once the process is terminated, stdin closes, causing the input thread to exit.
One optional thought: would it be helpful to log a debug message if the join times out and the thread is still alive? This could aid troubleshooting if the input thread ever gets stuck, wdyt?
Example:
if input_thread is not None: input_thread.join(timeout=10) if input_thread.is_alive(): # Optional: log that thread is still alive but daemon will not block shutdown pass
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/_executors/base.py(3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-11T19:53:44.427Z
Learnt from: yohannj
Repo: airbytehq/PyAirbyte PR: 716
File: airbyte/logs.py:384-402
Timestamp: 2025-07-11T19:53:44.427Z
Learning: In the PyAirbyte project, when reviewing PRs, maintain clear separation of concerns. Don't suggest changes that are outside the scope of the PR's main objective, even if they would improve consistency or fix other issues. This helps with reviewing changes and potential reverts.
Applied to files:
airbyte/_executors/base.py
🧬 Code graph analysis (1)
airbyte/_executors/base.py (1)
airbyte/exceptions.py (1)
AirbyteSubprocessFailedError(274-277)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte/_executors/base.py (4)
72-78: Excellent documentation of the concurrent I/O approach!The updated docstring clearly explains why the input thread runs concurrently with stdout reading, making the deadlock prevention strategy explicit for future maintainers.
91-102: LGTM - Daemon thread and concurrent I/O strategy properly implemented!The daemon thread setting (line 98) and the deferred join approach (lines 101-102) correctly address the deadlock issue and prevent interpreter shutdown hangs. This incorporates the feedback from the previous review.
147-165: Exception handling logic is spot-on!The reordered exception handling correctly prioritizes subprocess failure (lines 147-159) over input thread exceptions (lines 161-165), ensuring that
AirbyteSubprocessFailedErroris raised when the subprocess fails, with the input thread exception attached as context when appropriate.This properly addresses the previous review feedback and maintains the error-handling contract for callers like
airbyte/_connector_base.py:493that catchAirbyteSubprocessFailedError.The logic correctly handles all scenarios:
- Subprocess failure always raises
AirbyteSubprocessFailedError(with input exception as context if relevant)- Input thread exceptions only surface when the subprocess succeeded
BrokenPipeErroris appropriately ignored during graceful shutdown
64-169: Overall implementation successfully fixes the deadlock issue!This PR correctly addresses the root cause of issue #871 by enabling concurrent stdin/stdout I/O. The key improvements:
- ✅ Input thread runs concurrently without blocking stdout reading (prevents pipe deadlock)
- ✅ Daemon thread prevents interpreter shutdown hangs if thread gets stuck
- ✅ Thread join moved to finally block with timeout for proper cleanup
- ✅ Exception handling prioritizes subprocess failures while preserving input thread context
- ✅
BrokenPipeErrorappropriately ignored during graceful shutdownBoth previous review concerns have been incorporated, and the implementation handles all error scenarios correctly. The code is well-documented and maintains proper resource cleanup.
Summary
Fixes a potential deadlock in
_stream_from_subprocessthat could cause destination writes to hang indefinitely (reported in #871 as MySQL → Postgres sync hanging during write phase).Root cause: The previous implementation called
input_thread.join()immediately after starting the input thread, before reading from stdout. This creates a classic pipe deadlock:Fix: Move
input_thread.join()to the finally block after reading is complete, allowing input and output to happen concurrently.Closes #871
Updates since last revision
Addressed CodeRabbit feedback on exception handling order: Exit code is now checked first before raising input thread exceptions. This ensures
AirbyteSubprocessFailedErroris raised (with input thread exception asoriginal_exception) when the subprocess fails, preserving the error-handling contract for callers that catchAirbyteSubprocessFailedError.Review & Testing Checklist for Human
AirbyteSubprocessFailedErrorRecommended test plan:
python examples/run_faker.pywith a large record countNotes
input_thread.join()in the finally block prevents indefinite hangs during cleanupRequested by: AJ Steers (Aaron ("AJ") Steers (@aaronsteers))
Link to Devin run: https://app.devin.ai/sessions/5094f54055bd4fc68a357aa705357fce
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.
Important
Auto-merge enabled.
This PR is set to merge automatically when all requirements are met.